package com.opencee.cloud.msg.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyuncs.CommonRequest;
import com.aliyuncs.CommonResponse;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.profile.DefaultProfile;
import com.opencee.cloud.msg.api.constatns.*;
import com.opencee.cloud.msg.api.entity.MsgSmsTemplateEntity;
import com.opencee.cloud.msg.api.entity.MsgTaskSmsRecordsEntity;
import com.opencee.cloud.msg.api.vo.DelayedMessage;
import com.opencee.cloud.msg.api.vo.params.SmsBatchParams;
import com.opencee.cloud.msg.api.vo.params.TopicMessage;
import com.opencee.cloud.msg.api.vo.params.TopicParams;
import com.opencee.cloud.msg.config.SmsProperties;
import com.opencee.cloud.msg.mq.QueueConfiguration;
import com.opencee.cloud.msg.service.MsgTaskSmsRecordsService;
import com.opencee.cloud.msg.service.MsgSmsSenderService;
import com.opencee.cloud.msg.service.SmsTemplateService;
import com.opencee.cloud.msg.utils.MqUtil;
import com.opencee.common.utils.RedisTemplateUtil;
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import com.tencentcloudapi.sms.v20190711.SmsClient;
import com.tencentcloudapi.sms.v20190711.models.SendSmsRequest;
import com.tencentcloudapi.sms.v20190711.models.SendSmsResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;

/**
 * 短信发送实现类
 *
 * @author yadu
 */
@Slf4j
@Service
@RefreshScope
public class SmsServiceImpl implements MsgSmsSenderService {
    /**
     * 创建固定大小为100 的线程池
     */
    private Integer availableProcessors = Runtime.getRuntime().availableProcessors();
    private Integer numOfThreads = availableProcessors * 2;
    private ExecutorService executorService = new ThreadPoolExecutor(numOfThreads, numOfThreads, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
    /**
     * 多线程分发数量
     */
    private static final int SIZE = 100;

    /**
     * 手机号国际前缀
     */
    private static final String CN_PREFIX = "+86";
    /**
     * 短信验证码缓存前缀
     */
    public static final String CACHE_SMS_CODE = "sip:msg:sms_code:";
    /**
     * 短信验证码限制缓存前缀
     */
    public static final String CACHE_SMS_CODE_LIMIT = "sip:msg:sms_code_limit:";

    /**
     * 短信验证码有效期:单位分钟
     */
    private static final int CACHE_SMS_CODE_TTL = 5;

    /**
     * 短信验证码参数 key
     */
    private static final String PARAMS_CODE_KEY = "code";
    private static final String PARAMS_TTL_KEY = "time";

    @Autowired
    private RedisTemplateUtil redisUtil;
    @Autowired
    private SmsTemplateService smsTemplateService;
    @Autowired
    private MsgTaskSmsRecordsService smsSendRecordsService;
    @Autowired
    private SmsSendDetailsService smsSendDetailsService;
    @Autowired
    private SmsProperties smsProperties;
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private SmsBlackListService smsBlackListService;


    @Override
    public boolean send(SmsBatchParams message) {
        MsgTaskSmsRecordsEntity record = new MsgTaskSmsRecordsEntity();
        // 手机号码集合
        Set<String> mobileSet = new HashSet<>();
        try {
            Assert.hasText(message.getSourceID(), "sourceID不能为空");
            Assert.hasText(message.getServiceID(), "serviceID不能为空");
            Assert.notNull(message.getTemplateCode(), "templateCode不能为空");
            // 设置默认状态
            record.setState(MsgSentStatus.UNSENT.ordinal());
            // 获取模板信息
            MsgSmsTemplateEntity tpl = smsTemplateService.getByTplCode(message.getTemplateCode());
            Assert.notNull(tpl, "模板信息不存在");
            record.setTplCode(tpl.getTemplateCode());
            record.setTplId(tpl.getTplId());
            record.setTplSignName(tpl.getSignName());
            record.setChannel(tpl.getChannel());
            record.setTplContent(tpl.getTemplateContent());
            record.setChannelTplCode(tpl.getTemplateChannelCode());
            record.setSmsType(tpl.getTemplateType());
            Assert.isTrue(SmsChannel.isInclude(tpl.getChannel()), "短信通道暂不支持");


        } catch (Exception e) {
            record.setState(MsgSentStatus.ERROR.ordinal());
            record.setRemark(e.getMessage());
            log.error(e.getMessage());
        } finally {
            record.setServiceId(message.getServiceID());
            record.setSourceId(message.getSourceID());
            record.setTplCode(message.getTemplateCode());
            record.setSuccessCount(0);
            record.setDeleted(0);
            record.setDelayTime(message.getTiming());
            record.setCreateTime(new Date());
            record.setUpdateTime(record.getCreateTime());
            // 保存发送记录
            smsSendRecordsService.save(record);
        }
        Integer successCount = 0;
        if (record.getState().intValue() == MsgSentStatus.UNSENT.ordinal()) {
            if (record.getDelayTime() == null || (record.getDelayTime() != null && (record.getDelayTime().getTime() - System.currentTimeMillis()) <= 0)) {
                if (record.getSmsType().intValue() == MsgSmsType.CODE.ordinal()) {
                    // 生成验证码参数列表
                    List<Map<String, Object>> tplParams = randomCodeParamsList(mobileSet, record.getTplCode());
                    record.setTplParams(JSONObject.toJSONString(tplParams));
                }
                // 修改状态为:发送中
                record.setState(MsgSentStatus.SENDING.ordinal());
                MsgTaskSmsRecordsEntity modify = new MsgTaskSmsRecordsEntity();
                modify.setBizId(record.getBizId());
                modify.setTplParams(record.getTplParams());
                modify.setState(record.getState());
                smsSendRecordsService.updateById(modify);
                // 批量发送短信
                successCount = batchSend(mobileSet, record);
                record.setState(MsgSentStatus.COMPLETED.ordinal());
                modify.setSuccessCount(successCount);
                // 修改状态为:发送完成
                modify.setState(record.getState());
                smsSendRecordsService.updateById(modify);
            } else if (record.getDelayTime() != null && DateUtil.isSameDay(new Date(), record.getDelayTime())) {
                // 当天的定时发送直接加入延迟队列
                addDelayQueue(record);
            }
        }

        if (record.getDelayTime() != null) {
        }
        return successCount > 0;
    }

    /**
     * 发送延迟短信
     *
     * @param bizId
     * @return
     */
    @Override
    public boolean sendDelayed(Long bizId) {
        MsgTaskSmsRecordsEntity record = smsSendRecordsService.getById(bizId);
        if (record == null) {
            return false;
        }
        if (record.getState().intValue() != MsgSentStatus.WAITING.ordinal()) {
            // 非等待中,不处理
            return false;
        }
        // 手机号码集合
        Set<String> mobileSet = new HashSet<>();
        if (record.getReceiverType().intValue() == ReceiverType.USER.ordinal()) {
            // 通过用户获取手机号
            // mobileSet.addAll(message.getReceiverList());
        } else if (record.getReceiverType().intValue() == ReceiverType.ROLE.ordinal()) {
            // 通过角色获取手机号
            //mobileSet.addAll(message.getReceiverList());
        } else if (record.getReceiverType().intValue() == ReceiverType.WORKSPACE_SCOPE.ordinal()) {
            // 通过责任区域获取 手机号
            //mobileSet.addAll(message.getReceiverList());
        } else {
            //默认接收类型
            mobileSet.addAll(StrUtil.split(record.getReceiverList(), ','));
        }
        if (record.getSmsType().intValue() == MsgSmsType.CODE.ordinal()) {
            // 生成验证码参数列表
            List<Map<String, Object>> tplParams = randomCodeParamsList(mobileSet, record.getTplCode());
            record.setTplParams(JSONObject.toJSONString(tplParams));
        }
        MsgTaskSmsRecordsEntity modify = new MsgTaskSmsRecordsEntity();
        // 修改状态为:发送中
        record.setState(MsgSentStatus.SENDING.ordinal());
        modify.setBizId(record.getBizId());
        modify.setTplParams(record.getTplParams());
        modify.setState(record.getState());
        smsSendRecordsService.updateById(modify);
        Integer successCount = 0;
        // 批量发送
        successCount = batchSend(mobileSet, record);
        // 修改状态为:发送完成
        record.setState(MsgSentStatus.COMPLETED.ordinal());
        modify.setSuccessCount(successCount);
        modify.setState(record.getState());
        smsSendRecordsService.updateById(modify);
        return successCount > 0;
    }

    /**
     * 提前几天加载延迟记录到延迟队列
     *
     * @param beforeDay
     */
    @Override
    public Integer beforeLoadDelayQueue(Integer beforeDay) {
        if (beforeDay == null) {
            beforeDay = 1;
        }
        Map map = new HashMap();
        map.put("before", beforeDay);
        map.put("state", MsgSentStatus.UNSENT.ordinal());
        List<MsgTaskSmsRecordsEntity> list = smsSendRecordsService.findList(map);
        if (list != null && !list.isEmpty()) {
            list.forEach(r -> {
                addDelayQueue(r);
            });
        }
        return list.size();
    }

    /**
     * 批量发送短信
     *
     * @param mobiles
     * @param record
     * @return
     */
    private int batchSend(final Set<String> mobiles, final MsgTaskSmsRecordsEntity record) {
        if (mobiles == null || mobiles.isEmpty()) {
            return 0;
        }
        if (log.isDebugEnabled()) {
            log.debug("是否开启短信发送:【{}】", smsProperties.isEnabled());
        }
        List<String> list = new ArrayList<>(mobiles);
        long begin = System.nanoTime();
        BigDecimal successCount = BigDecimal.ZERO;
        List<Future<Integer>> futureList = new ArrayList<>();
        List<Map> tplParamList = JSONObject.parseArray(record.getTplParams(), Map.class);
        // 总数
        int totalCount = list.size();
        // 多线程分发数。阿里云一次100条,腾讯一次200条。默认100
        final int size = SIZE;
        // 线程数
        int threadCount = (totalCount + size - 1) / size;

        for (int i = 0; i < threadCount; i++) {
            final int beginIndex = i * size;
            final int endIndex = (threadCount - i) == 1 ? totalCount : (beginIndex + size);
            //使用Future，Callable实现发送消息后返回发送结果
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    // 截取分页发送列表
                    List<String> subList = new ArrayList<>(list.subList(beginIndex, endIndex));
                    // 发送总数
                    Integer size = subList.size();
                    // 成功数
                    Integer success = 0;
                    List<SmsSendDetails> detailsList = new ArrayList<>();
                    Date sendTime = new Date();
                    for (int i = 0; i < size; i++) {
                        String s = subList.get(i);
                        Map<String, Object> tplParams = tplParamList.get(i);
                        SmsSendDetails detail = new SmsSendDetails();
                        detail.setChannel(SmsChannel.valueOf(record.getChannel().intValue()).name());
                        detail.setSendTime(sendTime);
                        detail.setPhoneNum(s);
                        detail.setDeleted(0);
                        detail.setContent(StrUtil.format(record.getTplContent(), tplParams));
                        detail.setBizId(record.getBizId());
                        if (smsBlackListService.existsCache(s)) {
                            detail.setResult(0);
                            detail.setRemark("短信黑名单");
                        } else {
                            JSONObject requestParams = new JSONObject();
                            JSONObject responseResult = new JSONObject();
                            try {
                                if (record.getChannel().intValue() == SmsChannel.ALI_YUN.ordinal()) {
                                    requestParams.put("PhoneNumbers", s);
                                    requestParams.put("TemplateParam", JSONObject.toJSONString(tplParams));
                                    requestParams.put("SignName", record.getTplSignName());
                                    requestParams.put("TemplateCode", record.getChannelTplCode());
                                    // 阿里云发送
                                    if (smsProperties.isEnabled()) {
                                        String response = aliyunSendSms(requestParams);
                                        responseResult = JSONObject.parseObject(response);
                                    }
                                } else {
                                    JSONArray phoneSet = new JSONArray();
                                    phoneSet.add(CN_PREFIX + s);
                                    JSONArray tplParamsSet = new JSONArray();
                                    if (!tplParams.isEmpty()) {
                                        Set<String> keySet = tplParams.keySet();
                                        for (String key : keySet) {
                                            tplParamsSet.add(tplParams.get(key));
                                        }
                                    }
                                    requestParams.put("PhoneNumberSet", phoneSet);
                                    requestParams.put("TemplateParamSet", tplParamsSet);
                                    requestParams.put("Sign", record.getTplSignName());
                                    requestParams.put("TemplateID", record.getChannelTplCode());
                                    if (smsProperties.isEnabled()) {
                                        // 腾讯云发送
                                        String response = tencentSendSms(requestParams);
                                        responseResult = JSONObject.parseObject(response);
                                    }
                                }
                                success += 1;
                            } catch (Exception e) {
                                detail.setResult(0);
                                detail.setRemark((detail.getRemark() == null ? "" : detail.getRemark() + ",") + "错误信息：" + e.getMessage());
                            }
                            detail.setRequestParams(requestParams.toJSONString());
                            detail.setResponseResult(responseResult.toJSONString());
                        }
                        detailsList.add(detail);
                    }
                    // 写入发送明细
                    smsSendDetailsService.saveBatch(detailsList);
                    if (log.isDebugEnabled()) {
                        log.debug("批量发送list范围: {}-{} 发送数量:【{}】", beginIndex, endIndex, size);
                    }
                    return success;
                }
            });
            futureList.add(future);
            // 等待一会，省的接口出现 频繁调用
            ThreadUtil.sleep(200);
        }

        if (log.isDebugEnabled()) {
            log.debug("-----------------------" + (System.nanoTime() - begin) / 1000_000d + "-----------------------");
        }
        //主线程其他工作完毕,等待子线程的结束, 调用future.get()系列的方法即可。
        for (
                Future<Integer> result : futureList) {
            try {
                successCount = successCount.add(new BigDecimal(result.get()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("发送消息结束，耗时：" + (System.nanoTime() - begin) / 1000_000d);
        }

        TopicParams topicMessageParams = new TopicParams();
        topicMessageParams.add(new

                TopicMessage(record.getTenantId(), TopicType.SMS_DONE.getRouteKey(), TopicType.SMS_DONE.getAction(), record));
        MqUtil.topic(amqpTemplate, topicMessageParams);
        return successCount.intValue();
    }


    /**
     * 腾讯云短信发送
     *
     * @param params
     * @return
     * @throws Exception
     */
    private String tencentSendSms(JSONObject params) throws Exception {
        Credential cred = new Credential(smsProperties.getTencent().getappKeyId(), smsProperties.getTencent().getappSecret());
        HttpProfile httpProfile = new HttpProfile();
        httpProfile.setEndpoint("sms.tencentcloudapi.com");
        ClientProfile clientProfile = new ClientProfile();
        clientProfile.setHttpProfile(httpProfile);
        params.put("SmsSdkAppid", smsProperties.getTencent().getAppId());
        SmsClient client = new SmsClient(cred, smsProperties.getTencent().getRegion(), clientProfile);
        SendSmsRequest req = SendSmsRequest.fromJsonString(params.toJSONString(), SendSmsRequest.class);
        SendSmsResponse resp = client.SendSms(req);
        return SendSmsRequest.toJsonString(resp);
    }


    /**
     * 阿里云短信发送
     *
     * @param params
     * @return
     * @throws ClientException
     */
    public String aliyunSendSms(JSONObject params) throws ClientException {
        boolean result = false;
        // 地域ID
        DefaultProfile profile = DefaultProfile.getProfile(
                smsProperties.getAliyun().getRegion(),
                smsProperties.getAliyun().getappKeyId(),
                smsProperties.getAliyun().getappSecret());
        IAcsClient client = new DefaultAcsClient(profile);
        CommonRequest request = new CommonRequest();
        request.setSysMethod(MethodType.POST);
        request.setSysDomain("dysmsapi.aliyuncs.com");
        request.setSysVersion("2017-05-25");
        // 使用批量发送
        request.setSysAction("SendSms");
        for (String key : params.keySet()) {
            request.putQueryParameter(key, params.getString(key));
        }
        request.putQueryParameter("RegionId", smsProperties.getAliyun().getRegion());
        CommonResponse response = client.getCommonResponse(request);
        return response.getData();
    }


    /**
     * 加入队列
     */
    public void addDelayQueue(final MsgTaskSmsRecordsEntity record) {
        if (record == null) {
            return;
        }
        if (record.getBizId() == null || record.getDelayTime() == null) {
            return;
        }
        long times = record.getDelayTime().getTime() - System.currentTimeMillis();
        if (times < 0) {
            return;
        }
        log.debug("定时发送短信记录：{}，延迟：{}/ms", record.getBizId(), times);
        MsgTaskSmsRecordsEntity modify = new MsgTaskSmsRecordsEntity();
        modify.setBizId(record.getBizId());
        try {
            JSONObject json = new JSONObject();
            json.put("bizId", record.getBizId());
            // 减少消息队列体积只放入ID
            DelayedMessage delayedMessage = new DelayedMessage(record.getTenantId(), QueueConfiguration.DELAYED_SMS_QUEUE_RK, null, times, json);
            MqUtil.delayed(amqpTemplate, delayedMessage);
            // 修改状态为:等待中
            modify.setState(MsgSentStatus.WAITING.ordinal());
        } catch (Exception e) {
            modify.setState(MsgSentStatus.ERROR.ordinal());
            modify.setRemark(e.getMessage());
        }
        smsSendRecordsService.updateById(modify);
    }


    /**
     * 生成验证码参数列表
     *
     * @param mobileSet
     * @param tplCode
     * @return
     */
    private List<Map<String, Object>> randomCodeParamsList(Set<String> mobileSet, String tplCode) {
        List<Map<String, Object>> tplParams = new ArrayList<>();
        // 生成短信验证码列表，保证参数和手机号列表一致
        mobileSet.stream().forEach(s -> {
            Map map = new HashMap<>();
            String code = randomCode(s, tplCode);
            map.put(PARAMS_CODE_KEY, code);
            tplParams.add(map);
            if (log.isDebugEnabled()) {
                log.debug("验证码：{} -【{}】", s, code);
            }
        });
        return tplParams;
    }

    /**
     * 生成验证码
     *
     * @param mobile
     * @param tplCode
     * @return
     */
    private String randomCode(String mobile, String tplCode) {
        String key = CACHE_SMS_CODE + tplCode + ":" + mobile;
        String code = RandomUtil.randomNumbers(6);
        redisUtil.set(key, code, CACHE_SMS_CODE_TTL, TimeUnit.MINUTES);
        return code;
    }
}
